干货|字节跳动流式数据集成基于Flink Checkpoint两阶段提交的实践和优化(2)
字节跳动开发套件数据集成团队(DTS ,Data Transmission Service)在字节跳动内基于 Flink 实现了流批一体的数据集成服务。其中一个典型场景是 Kafka/ByteMQ/RocketMQ -> HDFS/Hive 。Kafka/ByteMQ/RocketMQ -> HDFS/Hive(下面均称之为 MQ dump,具体介绍可见字节跳动基于Flink的MQ-Hive实时数据集成 在数仓建设第一层,对数据的准确性和实时性要求比较高。
目前字节跳动中国区 MQ dump 例行任务数巨大,日均处理流量在 PB 量级。巨大的任务量和数据量对 MQ dump 的稳定性以及准确性带来了极大的挑战。
本文主要介绍 DTS MQ dump 在极端场景中遇到的数据丢失问题的排查与优化,最后介绍了上线效果。 本文分两次连载,第一篇主要介绍Flink Checkpoint 以及 MQ dump 写入流程。本篇则将重点介绍故障排查和优化方案。
DataLeap
故障排查过程
Flink日志查看
但是查看正式目录下相关文件的信息,我们发现 task 2、3 两个 task 并没有 Checkpoint 4608 的文件(文件名含有 task id 和 Checkpoint id 信息,所以可以根据正式目录下的文件名知道其是哪个 task 在哪个 Checkpoint 期间创建的)。故初步确定的原因是某些文件被误删造成数据丢失。Task 2/3/6/7 在文件删除后由于没有文件的写入和关闭操作,task 正常运行;而 task 0/1/4/5 在文件删除后还有文件的写入和关闭操作,造成 task 失败。
HDFS元数据查看
下一步就要去排查文件丢失的原因。我们通过 HDFS trace 记录表( HDFS trace记录表记录着用户和系统调用行为,以达到分析和运维的目的)查看 task 2 Checkpoint 4608 临时目录操作记录,对应的路径为/xx/_DUMP_TEMPORARY/cp-4608/task-2。
src_path | method | operation_cost_ms | toDateTime(local_timestamp_ms) | result |
/xx/_DUMP_TEMPORARY/cp-4608/task-2 | getFileInfo | 2 | 2021/10/31 18:23:02 | 1 |
/xx/_DUMP_TEMPORARY/cp-4608/task-2 | delete | 1111895 | 2021/10/31 18:22:09 | 1 |
/xx/_DUMP_TEMPORARY/cp-4608/task-2 | delete | 167990 | 2021/10/31 18:10:56 | 1 |
/xx/_DUMP_TEMPORARY/cp-4608/task-2 | delete | 107077 | 2021/10/31 18:10:05 | 1 |
/xx/_DUMP_TEMPORARY/cp-4608/task-2 | delete | 68885 | 2021/10/31 18:09:57 | 1 |
/xx/_DUMP_TEMPORARY/cp-4608/task-2 | delete | 119439 | 2021/10/31 18:08:17 | 1 |
/xx/_DUMP_TEMPORARY/cp-4608/task-2 | delete | 148846 | 2021/10/31 18:07:46 | 1 |
/xx/_DUMP_TEMPORARY/cp-4608/task-2 | delete | 115081 | 2021/10/31 18:06:52 | 1 |
/xx/_DUMP_TEMPORARY/cp-4608/task-2 | delete | 96490 | 2021/10/31 18:05:08 | 1 |
2021-10-31 18:08:58
左右实际有创建两个文件,但是由于删除操作的重复执行造成创建的两个文件被删除。src_path | method | operation_cost_ms | toDateTime(local_timestamp_ms) | result |
/xx/_DUMP_TEMPORARY/cp-4608/task-2/date=20211031/17_xx_2_4608.1635674938391.zstd | complete | 8 | 2021/10/31 18:08:58 | 1 |
/xx/_DUMP_TEMPORARY/cp-4608/task-2/date=20211031/17_xx_2_4608.1635674938391.zstd | fsync | 10 | 2021/10/31 18:08:58 | 1 |
/xx/_DUMP_TEMPORARY/cp-4608/task-2/date=20211031/17_xx_2_4608.1635674938391.zstd | addBlock | 9 | 2021/10/31 18:08:58 | 1 |
/xx/_DUMP_TEMPORARY/cp-4608/task-2/date=20211031/18_xx_2_4608.1635674938482.zstd | complete | 9 | 2021/10/31 18:08:58 | 1 |
/xx/_DUMP_TEMPORARY/cp-4608/task-2/date=20211031/18_xx_2_4608.1635674938482.zstd | fsync | 8 | 2021/10/31 18:08:58 | 1 |
/xx/_DUMP_TEMPORARY/cp-4608/task-2/date=20211031/18_xx_2_4608.1635674938482.zstd | addBlock | 24 | 2021/10/31 18:08:58 | 1 |
/xx/_DUMP_TEMPORARY/cp-4608/task-2/date=20211031/18_xx_2_4608.1635674938482.zstd | create | 12 | 2021/10/31 18:08:58 | 1 |
/xx/_DUMP_TEMPORARY/cp-4608/task-2/date=20211031/17_xx_2_4608.1635674938391.zstd | create | 12 | 2021/10/31 18:08:58 | 1 |
根本原因
18:03:37-18:08:58
一直在尝试调用 HDFS 删除接口删除临时目录,但是由于java.net
.SocketTimeoutException
一直删除失败。在时间点18:08:58
删除操作执行成功。而这个时间点也基本与我们在 HDFS trace 数据中发现删除操作的执行记录时间是对应的。通过日志我们发现建立文件以及关闭文件操作基本都是在18:08:58
这个时间点完成的,这个时间点与 HDFS trace 中的记录也是对应上的。DATALEAP
解决方案方案一:HDFS 保证操作的幂等性
为了解决这个问题,我们首先想到的是 HDFS 保证删除操作的幂等性,这样即使删除操作重复执行也不会影响后续写入的问题,进而可以保证数据的准确性。但是咨询 HDFS 后,HDFS 表示 HDFS在现有架构下无法保证删除的幂等性。
参考 DDIA (Designing Data-Intensive Applications) 第 9 章中关于因果关系的定义:因果关系对事件施加了一种顺序——因在果之前。对应于MQ dump 流程中删除操作是因,发生在写入数据之前。我们需要保证这两个关系的因果关系。而根据其解决因果问题的方法,一种解决思路是 HDFS 在每个client 请求中都带上序列号顺序,进而在HDFS NameNode 上可以保证单个client的请求因果性。跟HDFS 讨论后发现这个方案的实现成本会比较大。
方案二:使用文件 state
了解 HDFS 难以保证操作的幂等性后,我们想是否可以将写入前的删除操作去除,也就是说在写入 HDFS 之前不清理文件夹而是直接写入数据到文件,这样就不需要有因果性的保证。
如果我们知道临时文件夹中哪些文件是我们需要的,在重命名阶段就可以直接将需要的文件重命名到正式目录而忽略临时文件夹中的脏文件,这样在写入之前就不需要删除文件夹。故我们的解决方案是将写入的文件路径存储到 Flink state 中,从而确保在 commit 阶段以及恢复阶段可以将需要的文件移动到正式目录。
最终,我们选择了方案二解决该问题,使用文件 state 前后处理流程对比如下图所示:
目前文件 state 已经在线上使用了,下面先介绍一下实现中碰到的相关问题,然后再描述一下上线后效果。
文件 state 实现细节
可观测性
创建文件的数量:state 中所有文件的数量,也就是当前 Checkpoint 处理数据阶段创建的所有文件数量。
重命名成功文件的数量:NotifyCheckpointComplete 阶段将临时文件成功移动到正式目录下的文件数量。
忽略重命名文件的数量:NotifyCheckpointComplete 阶段忽略移动到正式目录下下的文件数量。也就是临时文件夹中不存在但是正式目录存在的文件。这种情况通常发生在任务有 Failover 的情况。Failover 后任务从 Checkpoint 中恢复,失败前已经重命名成功的文件在当前阶段会忽略重命名。
重命名失败的文件数量:临时目录以及正式目录下都不存在文件的数量。这种情况通常是由于任务发生了异常造成数据的丢失。目前线上比较常见的一个 case 是任务在关闭一段时间后再开启。由于 HDFS TTL 的设置小于任务关闭的时长,临时目录中写入的文件被 HDFS TTL 策略清除。这个结果实际是符合预期的。
前向兼容性
第一期写入数据前保留了删除操作
第二期删除了写入数据前的删除操作
DATALEAP
上线效果切主演练
HDFS 集群正常切主
HDFS 集群主节点失败超过10分钟
性能效果
DATALEAP
字节跳动流式数据集成仍在不断发展中,未来主要关注以下几方面:
功能增强,增加简单的数据转换逻辑,缩短流式数据处理链路,进而减少处理时延
架构升级,离线集成和实时数据集成架构统一
支持 auto scaling 功能,在业务高峰和低峰自动扩缩容,提高资源利用率,减少资源浪费
本文中介绍的《字节跳动流式数据集成基于Flink Checkpoint两阶段提交的实践和优化》,目前已通过火山引擎数据产品大数据研发治理套件 DataLeap 向外部企业输出。
参考文献
字节跳动基于Flink的MQ-Hive实时数据集成
字节跳动单点恢复功能及 Regional CheckPoint 优化实践
Designing Data-Intensive Applications Stateful Stream Processing
产品介绍
火山引擎大数据研发治理套件DataLeap
一站式数据中台套件,帮助用户快速完成数据集成、开发、运维、治理、资产、安全等全套数据中台建设,帮助数据团队有效的降低工作成本和数据维护成本、挖掘数据价值、为企业决策提供数据支撑。后台回复数字“2”了解产品
- End -